Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent OOM when receiving large request streams #3174

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

davidlar
Copy link
Contributor

This PR aims to fix 3173 by making sure we don't read more data than we can handle.
Now ctx.read() is called after the queue is drained by the consuming end of the stream. The call to ctx.read() will fill the queue with more data asynchronously.
Nettys AUTO_READ flag is already turned off in zio-http and ctx.read should be called when we are ready for more data, not before.

I'm not sure if we should keep the unbounded queue or not. I guess it will only contain one chunk at most.

Copy link
Collaborator

@kyri-petrou kyri-petrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix! AFAICT the fix works, although the part that worries me a bit is that with these changes we're delegating ctx.read() to a different threadpool which is will add a fair bit of overhead due to context switching.

I'm wondering whether we should add a continueReading: () => Boolean argument to UnsafeAsync.Streaming. This way we can decide in AsyncBodyReader whether we'll invoke a ctx.read() or defer it. The part that worries me with this is that we might be adding too much complexity. What do you think?

@@ -166,7 +174,7 @@ object NettyBody extends BodyEncoding {
maybeError =>
ZChannel.fromZIO(queue.shutdown) *>
maybeError.fold[ZChannel[Any, Any, Any, Any, E, Chunk[A], Unit]](ZChannel.unit)(ZChannel.fail(_)),
a => ZChannel.write(a) *> loop,
a => ZChannel.write(a) *> ZChannel.fromZIO(nettyRead.whenZIO(queue.isEmpty)) *> loop,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we should buffer reads here, e.g., nettyRead.whenZIO(queue.size.map(_ < XXX))

Previously this was set to 4096 (using a bounded queue which we can't use anymore since it was blocking Netty's threads) and there weren't any issues right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, 2 notes on performance.

  1. Use whenZIODiscard instead of whenZIO
  2. I think it might be better to store this ZChannel in a val to avoid creating it on each loop:
val maybeRead = ZChannel.fromZIO(nettyRead.whenZIODiscard(???))

// And then
a => ZChannel.write(a) *> maybeRead *> loop,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix the suggested changes in NettyBody!

@davidlar
Copy link
Contributor Author

Not sure I understand how ctx.read() can be called from AsyncBodyReader as it has be called from the consuming side of the queue. So it's probably to complex for me at least :-)

@davidlar
Copy link
Contributor Author

I tried locally with queue.size.map(_ < 4096) and it can be a lot of data since the chunks in the queue can be big in some cases.
Maybe we should count the queued bytes instead in a ref or something. Or lower that limit to 128 or so.
What do you think @kyri-petrou ?

@davidlar
Copy link
Contributor Author

davidlar commented Sep 27, 2024

It feels like the buffering before #3060 was meant to be maximum 4096 bytes, but was really max 4096 chunks which is not what we want, I guess...
On the other hand I haven't seen smaller chunks than 8kb when testing this, so maybe queue.isEmpty is a good choice after all.
I leave it like that if you want to approve! I will go away for the weekend...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

OOM when receiving large streaming requests
2 participants